package com.study.flink.java.day07_join_count;

import com.study.flink.java.day07_join_count.source.StreamDataSourceA;
import com.study.flink.java.day07_join_count.source.StreamDataSourceB;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 使用EventTime划分窗口实现双流Join
 */
public class FlinkTumblingWindowsLeftJoinDemo {

    public static void main(String[] args) throws Exception {
        int windowsSize = 10;
        long delay = 5002L;
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime作为时间标准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        //设置数据源
        //第一个流（左流）
        DataStream<Tuple3<String, String, Long>> leftSource = env.addSource(new StreamDataSourceA()).name("demo source A");
        //第二个流（右流）
        DataStream<Tuple3<String, String, Long>> rightSource = env.addSource(new StreamDataSourceB()).name("demo source B");

        // 水位线
        //("a","1",1000)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream = leftSource.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> element) {
                return element.f2;
            }
        });
        //("a","hangzhou",6000)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream = rightSource.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> element) {
                return element.f2;
            }
        });

        // left join操作
        leftStream.coGroup(rightStream)
                .where(new LeftSelectKey())
                .equalTo(new RightSelectKey())
                .window(TumblingEventTimeWindows.of(Time.seconds(windowsSize)))
                .apply(new LeftJoin()).print();

        env.execute("FlinkTumblingWindowsInnerLeftDemo");

    }

    public static class LeftJoin implements CoGroupFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>> {
        // coGroup左流的数据和有流的数据取出来，可以将key相同，并且在同一个窗口的数据取出来
        @Override
        public void coGroup(Iterable<Tuple3<String, String, Long>> leftElements, Iterable<Tuple3<String, String, Long>> rightElements, Collector<Tuple5<String, String, String, Long, Long>> out) throws Exception {
            // leftElements是左流的数据
            for(Tuple3<String, String, Long> leftElem: leftElements) {
                boolean hadElemets = false;
                // 如果左边的流join上了右边的流rightElements就不为空
                for(Tuple3<String, String, Long> rightElem: rightElements) {
                    //将join上的数据输出
                    out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, rightElem.f1, leftElem.f2, rightElem.f2));
                    hadElemets = true;
                }
                if(!hadElemets) {
                    // 没join上，给右边的数据赋空值
                    out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, "null", leftElem.f2, -1L));
                }

            }
        }
    }

    public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> w) throws Exception {
            return w.f0;
        }
    }

    public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> w) throws Exception {
            return w.f0;
        }
    }

}
